Encrypt Backup Mutation Log (#8159)

* encrypt backup mutation log

* format

* address pr comments

* format

* fix bug

* revert knobs

* address pr comments
This commit is contained in:
Nim Wijetunga
2022-09-20 15:43:39 -07:00
committed by GitHub
parent 9628561235
commit eadb769cfa
8 changed files with 325 additions and 159 deletions

1
.gitignore vendored
View File

@@ -64,6 +64,7 @@ packaging/msi/obj
simfdb
tests/oldBinaries
trace.*.xml
trace.*.json
.venv
# Editor files

View File

@@ -22,6 +22,9 @@
#include <time.h>
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/BlobCipher.h"
#include "fdbclient/GetEncryptCipherKeys.actor.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbrpc/simulator.h"
#include "flow/ActorCollection.h"
#include "flow/actorcompiler.h" // has to be last include
@@ -253,16 +256,18 @@ std::pair<Version, uint32_t> decodeBKMutationLogKey(Key key) {
bigEndian32(*(int32_t*)(key.begin() + backupLogPrefixBytes + sizeof(UID) + sizeof(uint8_t) + sizeof(int64_t))));
}
void decodeBackupLogValue(Arena& arena,
VectorRef<MutationRef>& result,
int& mutationSize,
StringRef value,
StringRef addPrefix,
StringRef removePrefix,
Version version,
Reference<KeyRangeMap<Version>> key_version) {
ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
VectorRef<MutationRef>* result,
VectorRef<Optional<MutationRef>>* encryptedResult,
int* mutationSize,
Standalone<StringRef> value,
Key addPrefix,
Key removePrefix,
Version version,
Reference<KeyRangeMap<Version>> key_version,
Database cx) {
try {
uint64_t offset(0);
state uint64_t offset(0);
uint64_t protocolVersion = 0;
memcpy(&protocolVersion, value.begin(), sizeof(uint64_t));
offset += sizeof(uint64_t);
@@ -274,36 +279,48 @@ void decodeBackupLogValue(Arena& arena,
throw incompatible_protocol_version();
}
uint32_t totalBytes = 0;
state uint32_t totalBytes = 0;
memcpy(&totalBytes, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t consumed = 0;
state uint32_t consumed = 0;
if (totalBytes + offset > value.size())
throw restore_missing_data();
int originalOffset = offset;
state int originalOffset = offset;
while (consumed < totalBytes) {
uint32_t type = 0;
memcpy(&type, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t len1 = 0;
state uint32_t len1 = 0;
memcpy(&len1, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t len2 = 0;
state uint32_t len2 = 0;
memcpy(&len2, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
ASSERT(offset + len1 + len2 <= value.size() && isValidMutationType(type));
MutationRef logValue;
Arena tempArena;
state MutationRef logValue;
state Arena tempArena;
logValue.type = type;
logValue.param1 = value.substr(offset, len1);
offset += len1;
logValue.param2 = value.substr(offset, len2);
offset += len2;
state Optional<MutationRef> encryptedLogValue = Optional<MutationRef>();
// Decrypt mutation ref if encrypted
if (logValue.isEncrypted()) {
encryptedLogValue = logValue;
Reference<AsyncVar<ClientDBInfo> const> dbInfo = cx->clientInfo;
TextAndHeaderCipherKeys cipherKeys =
wait(getEncryptCipherKeys(dbInfo, *logValue.encryptionHeader(), BlobCipherMetrics::BACKUP));
logValue = logValue.decrypt(cipherKeys, tempArena, BlobCipherMetrics::BACKUP);
}
ASSERT(!logValue.isEncrypted());
MutationRef originalLogValue = logValue;
if (logValue.type == MutationRef::ClearRange) {
KeyRangeRef range(logValue.param1, logValue.param2);
@@ -320,8 +337,8 @@ void decodeBackupLogValue(Arena& arena,
logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena);
}
logValue.param2 = addPrefix == StringRef() ? normalKeys.end : strinc(addPrefix, tempArena);
result.push_back_deep(arena, logValue);
mutationSize += logValue.expectedSize();
result->push_back_deep(*arena, logValue);
*mutationSize += logValue.expectedSize();
} else {
logValue.param1 = std::max(r.range().begin, range.begin);
logValue.param2 = minKey;
@@ -333,8 +350,13 @@ void decodeBackupLogValue(Arena& arena,
logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena);
logValue.param2 = logValue.param2.withPrefix(addPrefix, tempArena);
}
result.push_back_deep(arena, logValue);
mutationSize += logValue.expectedSize();
result->push_back_deep(*arena, logValue);
*mutationSize += logValue.expectedSize();
}
if (originalLogValue.param1 == logValue.param1 && originalLogValue.param2 == logValue.param2) {
encryptedResult->push_back_deep(*arena, encryptedLogValue);
} else {
encryptedResult->push_back_deep(*arena, Optional<MutationRef>());
}
}
}
@@ -348,8 +370,15 @@ void decodeBackupLogValue(Arena& arena,
if (addPrefix.size()) {
logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena);
}
result.push_back_deep(arena, logValue);
mutationSize += logValue.expectedSize();
result->push_back_deep(*arena, logValue);
*mutationSize += logValue.expectedSize();
// If we did not remove/add prefixes to the mutation then keep the original encrypted mutation so we
// do not have to re-encrypt unnecessarily
if (originalLogValue.param1 == logValue.param1 && originalLogValue.param2 == logValue.param2) {
encryptedResult->push_back_deep(*arena, encryptedLogValue);
} else {
encryptedResult->push_back_deep(*arena, Optional<MutationRef>());
}
}
}
@@ -374,6 +403,7 @@ void decodeBackupLogValue(Arena& arena,
.detail("Value", value);
throw;
}
return Void();
}
static double lastErrorTime = 0;
@@ -614,21 +644,24 @@ ACTOR Future<int> dumpData(Database cx,
state int mutationSize = 0;
loop {
try {
RCGroup group = waitNext(results.getFuture());
state RCGroup group = waitNext(results.getFuture());
lock->release(group.items.expectedSize());
BinaryWriter bw(Unversioned());
for (int i = 0; i < group.items.size(); ++i) {
bw.serializeBytes(group.items[i].value);
}
decodeBackupLogValue(req.arena,
req.transaction.mutations,
mutationSize,
bw.toValue(),
addPrefix,
removePrefix,
group.groupKey,
keyVersion);
Standalone<StringRef> value = bw.toValue();
wait(decodeBackupLogValue(&req.arena,
&req.transaction.mutations,
&req.transaction.encryptedMutations,
&mutationSize,
value,
addPrefix,
removePrefix,
group.groupKey,
keyVersion,
cx));
newBeginVersion = group.groupKey + 1;
if (mutationSize >= CLIENT_KNOBS->BACKUP_LOG_WRITE_BATCH_MAX_SIZE) {
break;
@@ -652,8 +685,10 @@ ACTOR Future<int> dumpData(Database cx,
Key rangeEnd = getApplyKey(newBeginVersion, uid);
req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::SetValue, applyBegin, versionKey));
req.transaction.encryptedMutations.push_back_deep(req.arena, Optional<MutationRef>());
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(applyBegin));
req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::ClearRange, rangeBegin, rangeEnd));
req.transaction.encryptedMutations.push_back_deep(req.arena, Optional<MutationRef>());
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(rangeBegin));
// The commit request contains no read conflict ranges, so regardless of what read version we

View File

@@ -3328,6 +3328,14 @@ bool AccumulatedMutations::matchesAnyRange(const std::vector<KeyRange>& ranges)
std::vector<MutationRef> mutations = decodeMutationLogValue(serializedMutations);
for (auto& m : mutations) {
for (auto& r : ranges) {
if (m.type == MutationRef::Encrypted) {
// TODO: In order to filter out encrypted mutations that are not relevant to the
// target range, they would have to be decrypted here in order to check relevance
// below, however the staged mutations would still need to remain encrypted for
// staging into the destination database. Without decrypting, we must assume that
// some data could match the range and return true here.
return true;
}
if (m.type == MutationRef::ClearRange) {
if (r.intersects(KeyRangeRef(m.param1, m.param2))) {
return true;

View File

@@ -24,6 +24,7 @@
#include "fdbclient/BlobCipher.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/GetEncryptCipherKeys.actor.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/Tracing.h"
@@ -171,6 +172,22 @@ struct MutationRef {
return encrypt(cipherKeys, SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, arena, usageType);
}
MutationRef decrypt(TextAndHeaderCipherKeys cipherKeys,
Arena& arena,
BlobCipherMetrics::UsageType usageType,
StringRef* buf = nullptr) const {
const BlobCipherEncryptHeader* header = encryptionHeader();
DecryptBlobCipherAes256Ctr cipher(cipherKeys.cipherTextKey, cipherKeys.cipherHeaderKey, header->iv, usageType);
StringRef plaintext = cipher.decrypt(param2.begin(), param2.size(), *header, arena)->toStringRef();
if (buf != nullptr) {
*buf = plaintext;
}
ArenaReader reader(arena, plaintext, AssumeVersion(ProtocolVersion::withEncryptionAtRest()));
MutationRef mutation;
reader >> mutation;
return mutation;
}
MutationRef decrypt(const std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>& cipherKeys,
Arena& arena,
BlobCipherMetrics::UsageType usageType,
@@ -180,15 +197,10 @@ struct MutationRef {
auto headerCipherItr = cipherKeys.find(header->cipherHeaderDetails);
ASSERT(textCipherItr != cipherKeys.end() && textCipherItr->second.isValid());
ASSERT(headerCipherItr != cipherKeys.end() && headerCipherItr->second.isValid());
DecryptBlobCipherAes256Ctr cipher(textCipherItr->second, headerCipherItr->second, header->iv, usageType);
StringRef plaintext = cipher.decrypt(param2.begin(), param2.size(), *header, arena)->toStringRef();
if (buf != nullptr) {
*buf = plaintext;
}
ArenaReader reader(arena, plaintext, AssumeVersion(ProtocolVersion::withEncryptionAtRest()));
MutationRef mutation;
reader >> mutation;
return mutation;
TextAndHeaderCipherKeys textAndHeaderKeys;
textAndHeaderKeys.cipherHeaderKey = headerCipherItr->second;
textAndHeaderKeys.cipherTextKey = textCipherItr->second;
return decrypt(textAndHeaderKeys, arena, usageType, buf);
}
// These masks define which mutation types have particular properties (they are used to implement
@@ -253,6 +265,11 @@ struct CommitTransactionRef {
VectorRef<KeyRangeRef> read_conflict_ranges;
VectorRef<KeyRangeRef> write_conflict_ranges;
VectorRef<MutationRef> mutations; // metadata mutations
// encryptedMutations should be a 1-1 corespondence with mutations field above. That is either
// encryptedMutations.size() == 0 or encryptedMutations.size() == mutations.size() and encryptedMutations[i] =
// mutations[i].encrypt(). Currently this field is not serialized so clients should NOT set this field during a
// usual commit path. It is currently only used during backup mutation log restores.
VectorRef<Optional<MutationRef>> encryptedMutations;
Version read_snapshot = 0;
bool report_conflicting_keys = false;
bool lock_aware = false; // set when metadata mutations are present

View File

@@ -50,6 +50,7 @@
#include "fdbserver/RatekeeperInterface.h"
#include "fdbserver/RecoveryState.h"
#include "fdbserver/RestoreUtil.h"
#include "fdbserver/ServerDBInfo.actor.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/ActorCollection.h"
@@ -61,6 +62,7 @@
#include "fdbclient/Tracing.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "flow/network.h"
ACTOR Future<Void> broadcastTxnRequest(TxnStateRequest req, int sendAmount, bool sendReply) {
state ReplyPromise<Void> reply = req.reply;
@@ -1240,25 +1242,47 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
return Void();
}
void writeMutation(CommitBatchContext* self, int64_t tenantId, const MutationRef& mutation) {
ACTOR Future<MutationRef> writeMutation(CommitBatchContext* self,
int64_t tenantId,
const MutationRef* mutation,
Optional<MutationRef>* encryptedMutationOpt,
Arena* arena) {
static_assert(TenantInfo::INVALID_TENANT == INVALID_ENCRYPT_DOMAIN_ID);
if (self->pProxyCommitData->isEncryptionEnabled) {
EncryptCipherDomainId domainId = tenantId;
if (domainId == INVALID_ENCRYPT_DOMAIN_ID) {
std::pair<EncryptCipherDomainName, EncryptCipherDomainId> p =
getEncryptDetailsFromMutationRef(self->pProxyCommitData, mutation);
domainId = p.second;
state MutationRef encryptedMutation;
if (encryptedMutationOpt->present()) {
CODE_PROBE(true, "using already encrypted mutation");
encryptedMutation = encryptedMutationOpt->get();
ASSERT(encryptedMutation.isEncrypted());
// During simulation check whether the encrypted mutation matches the decrpyted mutation
if (g_network && g_network->isSimulated()) {
Reference<AsyncVar<ServerDBInfo> const> dbInfo = self->pProxyCommitData->db;
state const BlobCipherEncryptHeader* header = encryptedMutation.encryptionHeader();
TextAndHeaderCipherKeys cipherKeys =
wait(getEncryptCipherKeys(dbInfo, *header, BlobCipherMetrics::TLOG));
MutationRef decryptedMutation = encryptedMutation.decrypt(cipherKeys, *arena, BlobCipherMetrics::TLOG);
ASSERT(decryptedMutation.param1 == mutation->param1 && decryptedMutation.param2 == mutation->param2 &&
decryptedMutation.type == mutation->type);
}
} else {
if (domainId == INVALID_ENCRYPT_DOMAIN_ID) {
std::pair<EncryptCipherDomainName, EncryptCipherDomainId> p =
getEncryptDetailsFromMutationRef(self->pProxyCommitData, *mutation);
domainId = p.second;
CODE_PROBE(true, "Raw access mutation encryption");
CODE_PROBE(true, "Raw access mutation encryption");
}
ASSERT_NE(domainId, INVALID_ENCRYPT_DOMAIN_ID);
encryptedMutation = mutation->encrypt(self->cipherKeys, domainId, *arena, BlobCipherMetrics::TLOG);
}
ASSERT_NE(domainId, INVALID_ENCRYPT_DOMAIN_ID);
Arena arena;
self->toCommit.writeTypedMessage(mutation.encrypt(self->cipherKeys, domainId, arena, BlobCipherMetrics::TLOG));
self->toCommit.writeTypedMessage(encryptedMutation);
return encryptedMutation;
} else {
self->toCommit.writeTypedMessage(mutation);
self->toCommit.writeTypedMessage(*mutation);
return *mutation;
}
}
@@ -1278,6 +1302,9 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
state Optional<ClientTrCommitCostEstimation>* trCost = &trs[self->transactionNum].commitCostEstimation;
state int mutationNum = 0;
state VectorRef<MutationRef>* pMutations = &trs[self->transactionNum].transaction.mutations;
state VectorRef<Optional<MutationRef>>* encryptedMutations =
&trs[self->transactionNum].transaction.encryptedMutations;
ASSERT(encryptedMutations->size() == 0 || encryptedMutations->size() == pMutations->size());
state int64_t tenantId = trs[self->transactionNum].tenantInfo.tenantId;
self->toCommit.addTransactionInfo(trs[self->transactionNum].spanContext);
@@ -1292,13 +1319,17 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
}
}
auto& m = (*pMutations)[mutationNum];
state MutationRef m = (*pMutations)[mutationNum];
state Optional<MutationRef> encryptedMutation =
encryptedMutations->size() > 0 ? (*encryptedMutations)[mutationNum] : Optional<MutationRef>();
state Arena arena;
state MutationRef writtenMutation;
self->mutationCount++;
self->mutationBytes += m.expectedSize();
self->yieldBytes += m.expectedSize();
ASSERT(!m.isEncrypted());
// Determine the set of tags (responsible storage servers) for the mutation, splitting it
// if necessary. Serialize (splits of) the mutation into the message buffer and add the tags.
if (isSingleKeyMutation((MutationRef::Type)m.type)) {
auto& tags = pProxyCommitData->tagsForKey(m.param1);
@@ -1336,7 +1367,11 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
if (pProxyCommitData->cacheInfo[m.param1]) {
self->toCommit.addTag(cacheTag);
}
writeMutation(self, tenantId, m);
if (encryptedMutation.present()) {
ASSERT(encryptedMutation.get().isEncrypted());
}
MutationRef tempMutation = wait(writeMutation(self, tenantId, &m, &encryptedMutation, &arena));
writtenMutation = tempMutation;
} else if (m.type == MutationRef::ClearRange) {
KeyRangeRef clearRange(KeyRangeRef(m.param1, m.param2));
auto ranges = pProxyCommitData->keyInfo.intersectingRanges(clearRange);
@@ -1389,7 +1424,8 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
if (pProxyCommitData->needsCacheTag(clearRange)) {
self->toCommit.addTag(cacheTag);
}
writeMutation(self, tenantId, m);
MutationRef tempMutation = wait(writeMutation(self, tenantId, &m, &encryptedMutation, &arena));
writtenMutation = tempMutation;
} else {
UNREACHABLE();
}
@@ -1403,7 +1439,9 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
if (m.type != MutationRef::Type::ClearRange) {
// Add the mutation to the relevant backup tag
for (auto backupName : pProxyCommitData->vecBackupKeys[m.param1]) {
self->logRangeMutations[backupName].push_back_deep(self->logRangeMutationsArena, m);
// If encryption is enabled make sure the mutation we are writing is also encrypted
ASSERT(!self->pProxyCommitData->isEncryptionEnabled || writtenMutation.isEncrypted());
self->logRangeMutations[backupName].push_back_deep(self->logRangeMutationsArena, writtenMutation);
}
} else {
KeyRangeRef mutationRange(m.param1, m.param2);
@@ -1421,6 +1459,21 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
MutationRef backupMutation(
MutationRef::Type::ClearRange, intersectionRange.begin, intersectionRange.end);
// TODO (Nim): Currently clear ranges are encrypted using the default encryption key, this must be
// changed to account for clear ranges which span tenant boundaries
if (self->pProxyCommitData->isEncryptionEnabled) {
if (backupMutation.param1 == m.param1 && backupMutation.param2 == m.param2 &&
encryptedMutation.present()) {
backupMutation = encryptedMutation.get();
} else {
std::pair<EncryptCipherDomainName, EncryptCipherDomainId> p =
getEncryptDetailsFromMutationRef(self->pProxyCommitData, backupMutation);
EncryptCipherDomainId domainId = p.second;
backupMutation =
backupMutation.encrypt(self->cipherKeys, domainId, arena, BlobCipherMetrics::BACKUP);
}
}
// Add the mutation to the relevant backup tag
for (auto backupName : backupRange.value()) {
self->logRangeMutations[backupName].push_back_deep(self->logRangeMutationsArena,

View File

@@ -21,9 +21,12 @@
// This file implements the functions and actors used by the RestoreLoader role.
// The RestoreLoader role starts with the restoreLoaderCore actor
#include "fdbclient/BlobCipher.h"
#include "flow/UnitTest.h"
#include "fdbclient/BackupContainer.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/GetEncryptCipherKeys.actor.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbserver/RestoreLoader.actor.h"
#include "fdbserver/RestoreRoleCommon.actor.h"
#include "fdbserver/MutationTracking.h"
@@ -44,17 +47,19 @@ void splitMutation(const KeyRangeMap<UID>& krMap,
VectorRef<MutationRef>& mvector,
Arena& nodeIDs_arena,
VectorRef<UID>& nodeIDs);
void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
SerializedMutationListMap* mutationMap,
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter,
LoaderCounters* cc,
const RestoreAsset& asset);
ACTOR Future<Void> _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
SerializedMutationListMap* mutationMap,
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter,
LoaderCounters* cc,
RestoreAsset asset,
Database cx);
void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference<RestoreLoaderData> self);
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self);
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self, Database cx);
ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequest req,
Reference<RestoreLoaderData> self);
Reference<RestoreLoaderData> self,
Database cx);
ACTOR Future<Void> sendMutationsToApplier(
std::priority_queue<RestoreLoaderSchedSendLoadParamRequest>* sendLoadParamQueue,
std::map<int, int>* inflightSendLoadParamReqs,
@@ -64,7 +69,8 @@ ACTOR Future<Void> sendMutationsToApplier(
RestoreAsset asset,
bool isRangeFile,
std::map<Key, UID>* pRangeToApplier,
std::map<UID, RestoreApplierInterface>* pApplierInterfaces);
std::map<UID, RestoreApplierInterface>* pApplierInterfaces,
Database cx);
ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pProcessedFileOffset,
SerializedMutationListMap* mutationMap,
Reference<IBackupContainer> bc,
@@ -85,7 +91,7 @@ ACTOR Future<Void> handleFinishVersionBatchRequest(RestoreVersionBatchRequest re
// Dispatch requests based on node's business (i.e, cpu usage for now) and requests' priorities
// Requests for earlier version batches are preferred; which is equivalent to
// sendMuttionsRequests are preferred than loadingFileRequests
ACTOR Future<Void> dispatchRequests(Reference<RestoreLoaderData> self) {
ACTOR Future<Void> dispatchRequests(Reference<RestoreLoaderData> self, Database cx) {
try {
state int curVBInflightReqs = 0;
state int sendLoadParams = 0;
@@ -139,7 +145,7 @@ ACTOR Future<Void> dispatchRequests(Reference<RestoreLoaderData> self) {
// Dispatch the request if it is the next version batch to process or if cpu usage is low
if (req.batchIndex - 1 == self->finishedSendingVB ||
self->cpuUsage < SERVER_KNOBS->FASTRESTORE_SCHED_TARGET_CPU_PERCENT) {
self->addActor.send(handleSendMutationsRequest(req, self));
self->addActor.send(handleSendMutationsRequest(req, self, cx));
self->sendingQueue.pop();
}
}
@@ -204,7 +210,7 @@ ACTOR Future<Void> dispatchRequests(Reference<RestoreLoaderData> self) {
self->loadingQueue.pop();
ASSERT(false); // Check if this ever happens easily
} else {
self->addActor.send(handleLoadFileRequest(req, self));
self->addActor.send(handleLoadFileRequest(req, self, cx));
self->loadingQueue.pop();
lastLoadReqs++;
}
@@ -244,7 +250,7 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf,
actors.add(updateProcessMetrics(self));
actors.add(traceProcessMetrics(self, "RestoreLoader"));
self->addActor.send(dispatchRequests(self));
self->addActor.send(dispatchRequests(self, cx));
loop {
state std::string requestTypeStr = "[Init]";
@@ -361,6 +367,18 @@ void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference<Res
req.reply.send(RestoreCommonReply(self->id()));
}
ACTOR static Future<MutationRef> _decryptMutation(MutationRef mutation, Database cx, Arena* arena) {
ASSERT(mutation.isEncrypted());
Reference<AsyncVar<ClientDBInfo> const> dbInfo = cx->clientInfo;
state const BlobCipherEncryptHeader* header = mutation.encryptionHeader();
std::unordered_set<BlobCipherDetails> cipherDetails;
cipherDetails.insert(header->cipherHeaderDetails);
cipherDetails.insert(header->cipherTextDetails);
std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>> getCipherKeysResult =
wait(getEncryptCipherKeys(dbInfo, cipherDetails, BlobCipherMetrics::BACKUP));
return mutation.decrypt(getCipherKeysResult, *arena, BlobCipherMetrics::BACKUP);
}
// Parse a data block in a partitioned mutation log file and store mutations
// into "kvOpsIter" and samples into "samplesIter".
ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
@@ -370,7 +388,8 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter,
LoaderCounters* cc,
Reference<IBackupContainer> bc,
RestoreAsset asset) {
RestoreAsset asset,
Database cx) {
state Standalone<StringRef> buf = makeString(asset.len);
state Reference<IAsyncFile> file = wait(bc->readFile(asset.filename));
int rLen = wait(file->read(mutateString(buf), asset.len, asset.offset));
@@ -389,21 +408,21 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
wait(processedFileOffset->whenAtLeast(asset.offset));
ASSERT(processedFileOffset->get() == asset.offset);
Arena tempArena;
StringRefReader reader(buf, restore_corrupted_data());
state Arena tempArena;
state StringRefReader reader(buf, restore_corrupted_data());
try {
// Read block header
if (reader.consume<int32_t>() != PARTITIONED_MLOG_VERSION)
throw restore_unsupported_file_version();
VersionedMutationsMap& kvOps = kvOpsIter->second;
state VersionedMutationsMap* kvOps = &kvOpsIter->second;
while (1) {
// If eof reached or first key len bytes is 0xFF then end of block was reached.
if (reader.eof() || *reader.rptr == 0xFF)
break;
// Deserialize messages written in saveMutationsToFile().
LogMessageVersion msgVersion;
state LogMessageVersion msgVersion;
msgVersion.version = reader.consumeNetworkUInt64();
msgVersion.sub = reader.consumeNetworkUInt32();
int msgSize = reader.consumeNetworkInt32();
@@ -413,19 +432,20 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
if (!asset.isInVersionRange(msgVersion.version))
continue;
VersionedMutationsMap::iterator it;
state VersionedMutationsMap::iterator it;
bool inserted;
std::tie(it, inserted) = kvOps.emplace(msgVersion, MutationsVec());
std::tie(it, inserted) = kvOps->emplace(msgVersion, MutationsVec());
// A clear mutation can be split into multiple mutations with the same (version, sub).
// See saveMutationsToFile(). Current tests only use one key range per backup, thus
// only one clear mutation is generated (i.e., always inserted).
ASSERT(inserted);
ArenaReader rd(buf.arena(), StringRef(message, msgSize), AssumeVersion(g_network->protocolVersion()));
MutationRef mutation;
state MutationRef mutation;
rd >> mutation;
if (mutation.isEncrypted()) {
throw encrypt_unsupported();
MutationRef decryptedMutation = wait(_decryptMutation(mutation, cx, &tempArena));
mutation = decryptedMutation;
}
// Skip mutation whose commitVesion < range kv's version
@@ -500,12 +520,13 @@ ACTOR static Future<Void> parsePartitionedLogFileOnLoader(
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter,
LoaderCounters* cc,
Reference<IBackupContainer> bc,
RestoreAsset asset) {
RestoreAsset asset,
Database cx) {
state int readFileRetries = 0;
loop {
try {
wait(_parsePartitionedLogFileOnLoader(
pRangeVersions, processedFileOffset, kvOpsIter, samplesIter, cc, bc, asset));
pRangeVersions, processedFileOffset, kvOpsIter, samplesIter, cc, bc, asset, cx));
break;
} catch (Error& e) {
if (e.code() == error_code_restore_bad_read || e.code() == error_code_restore_unsupported_file_version ||
@@ -532,7 +553,8 @@ ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions,
LoadingParam param,
Reference<LoaderBatchData> batchData,
UID loaderID,
Reference<IBackupContainer> bc) {
Reference<IBackupContainer> bc,
Database cx) {
// Temporary data structure for parsing log files into (version, <K, V, mutationType>)
// Must use StandAlone to save mutations, otherwise, the mutationref memory will be corrupted
// mutationMap: Key is the unique identifier for a batch of mutation logs at the same version
@@ -572,7 +594,8 @@ ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions,
samplesIter,
&batchData->counters,
bc,
subAsset));
subAsset,
cx));
} else {
fileParserFutures.push_back(
parseLogFileToMutationsOnLoader(&processedFileOffset, &mutationMap, bc, subAsset));
@@ -582,8 +605,8 @@ ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions,
wait(waitForAll(fileParserFutures));
if (!param.isRangeFile && !param.isPartitionedLog()) {
_parseSerializedMutation(
pRangeVersions, kvOpsPerLPIter, &mutationMap, samplesIter, &batchData->counters, param.asset);
wait(_parseSerializedMutation(
pRangeVersions, kvOpsPerLPIter, &mutationMap, samplesIter, &batchData->counters, param.asset, cx));
}
TraceEvent("FastRestoreLoaderProcessLoadingParamDone", loaderID)
@@ -594,7 +617,7 @@ ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions,
}
// A loader can process multiple RestoreLoadFileRequest in parallel.
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self) {
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self, Database cx) {
state Reference<LoaderBatchData> batchData = self->batch[req.batchIndex];
state bool isDuplicated = true;
state bool printTrace = false;
@@ -623,7 +646,7 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
.detail("ProcessLoadParam", req.param.toString());
ASSERT(batchData->sampleMutations.find(req.param) == batchData->sampleMutations.end());
batchData->processedFileParams[req.param] =
_processLoadingParam(&self->rangeVersions, req.param, batchData, self->id(), self->bc);
_processLoadingParam(&self->rangeVersions, req.param, batchData, self->id(), self->bc, cx);
self->inflightLoadingReqs++;
isDuplicated = false;
} else {
@@ -682,7 +705,8 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
// Send buffered mutations to appliers.
// Do not need to block on low memory usage because this actor should not increase memory usage.
ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequest req,
Reference<RestoreLoaderData> self) {
Reference<RestoreLoaderData> self,
Database cx) {
state Reference<LoaderBatchData> batchData;
state Reference<LoaderBatchStatus> batchStatus;
state bool isDuplicated = true;
@@ -759,7 +783,8 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
loadParam.asset,
loadParam.isRangeFile,
&batchData->rangeToApplier,
&self->appliersInterf));
&self->appliersInterf,
cx));
}
}
wait(waitForAll(fSendMutations));
@@ -812,7 +837,8 @@ ACTOR Future<Void> sendMutationsToApplier(
RestoreAsset asset,
bool isRangeFile,
std::map<Key, UID>* pRangeToApplier,
std::map<UID, RestoreApplierInterface>* pApplierInterfaces) {
std::map<UID, RestoreApplierInterface>* pApplierInterfaces,
Database cx) {
state VersionedMutationsMap& kvOps = *pkvOps;
state VersionedMutationsMap::iterator kvOp = kvOps.begin();
state int kvCount = 0;
@@ -820,6 +846,7 @@ ACTOR Future<Void> sendMutationsToApplier(
state Version msgIndex = 1; // Monotonically increased index for send message, must start at 1
state std::vector<UID> applierIDs = getApplierIDs(*pRangeToApplier);
state double msgSize = 0; // size of mutations in the message
state Arena arena;
// Wait for scheduler to kick it off
Promise<Void> toSched;
@@ -863,14 +890,18 @@ ACTOR Future<Void> sendMutationsToApplier(
for (auto& applierID : applierIDs) {
applierVersionedMutationsBuffer[applierID] = VersionedMutationsVec();
}
KeyRangeMap<UID> krMap;
state KeyRangeMap<UID> krMap;
buildApplierRangeMap(&krMap, pRangeToApplier);
for (kvOp = kvOps.begin(); kvOp != kvOps.end(); kvOp++) {
commitVersion = kvOp->first;
ASSERT(commitVersion.version >= asset.beginVersion);
ASSERT(commitVersion.version <= asset.endVersion); // endVersion is an empty commit to ensure progress
for (mIndex = 0; mIndex < kvOp->second.size(); mIndex++) {
MutationRef& kvm = kvOp->second[mIndex];
state MutationRef kvm = kvOp->second[mIndex];
if (kvm.isEncrypted()) {
MutationRef decryptedMutation = wait(_decryptMutation(kvm, cx, &arena));
kvm = decryptedMutation;
}
// Send the mutation to applier
if (isRangeMutation(kvm)) {
MutationsVec mvector;
@@ -1082,31 +1113,35 @@ bool concatenateBackupMutationForLogFile(SerializedMutationListMap* pMutationMap
// we may not get the entire mutation list for the version encoded_list_of_mutations:
// [mutation1][mutation2]...[mutationk], where
// a mutation is encoded as [type:uint32_t][keyLength:uint32_t][valueLength:uint32_t][keyContent][valueContent]
void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
SerializedMutationListMap* pmutationMap,
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter,
LoaderCounters* cc,
const RestoreAsset& asset) {
VersionedMutationsMap& kvOps = kvOpsIter->second;
SampledMutationsVec& samples = samplesIter->second;
SerializedMutationListMap& mutationMap = *pmutationMap;
ACTOR Future<Void> _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
SerializedMutationListMap* pmutationMap,
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter,
LoaderCounters* cc,
RestoreAsset asset,
Database cx) {
state VersionedMutationsMap* kvOps = &kvOpsIter->second;
state SampledMutationsVec* samples = &samplesIter->second;
state SerializedMutationListMap::iterator mutationMapIterator = pmutationMap->begin();
TraceEvent(SevFRMutationInfo, "FastRestoreLoaderParseSerializedLogMutation")
.detail("BatchIndex", asset.batchIndex)
.detail("RestoreAsset", asset.toString());
Arena tempArena;
for (auto& m : mutationMap) {
StringRef k = m.first.contents();
StringRef val = m.second.first.contents();
state Arena tempArena;
loop {
if (mutationMapIterator == pmutationMap->end()) {
break;
}
StringRef k = mutationMapIterator->first.contents();
state StringRef val = mutationMapIterator->second.first.contents();
StringRefReader kReader(k, restore_corrupted_data());
uint64_t commitVersion = kReader.consume<uint64_t>(); // Consume little Endian data
state uint64_t commitVersion = kReader.consume<uint64_t>(); // Consume little Endian data
// We have already filter the commit not in [beginVersion, endVersion) when we concatenate kv pair in log file
ASSERT_WE_THINK(asset.isInVersionRange(commitVersion));
StringRefReader vReader(val, restore_corrupted_data());
state StringRefReader vReader(val, restore_corrupted_data());
vReader.consume<uint64_t>(); // Consume the includeVersion
// TODO(xumengpanda): verify the protocol version is compatible and raise error if needed
@@ -1114,72 +1149,79 @@ void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
uint32_t val_length_decoded = vReader.consume<uint32_t>();
ASSERT(val_length_decoded == val.size() - sizeof(uint64_t) - sizeof(uint32_t));
int sub = 0;
while (1) {
state int sub = 0;
loop {
// stop when reach the end of the string
if (vReader.eof()) { //|| *reader.rptr == 0xFF
break;
}
uint32_t type = vReader.consume<uint32_t>();
uint32_t kLen = vReader.consume<uint32_t>();
uint32_t vLen = vReader.consume<uint32_t>();
const uint8_t* k = vReader.consume(kLen);
const uint8_t* v = vReader.consume(vLen);
state uint32_t type = vReader.consume<uint32_t>();
state uint32_t kLen = vReader.consume<uint32_t>();
state uint32_t vLen = vReader.consume<uint32_t>();
state const uint8_t* k = vReader.consume(kLen);
state const uint8_t* v = vReader.consume(vLen);
MutationRef mutation((MutationRef::Type)type, KeyRef(k, kLen), KeyRef(v, vLen));
state MutationRef mutation((MutationRef::Type)type, KeyRef(k, kLen), KeyRef(v, vLen));
if (mutation.isEncrypted()) {
MutationRef decryptedMutation = wait(_decryptMutation(mutation, cx, &tempArena));
mutation = decryptedMutation;
}
// Should this mutation be skipped?
// Skip mutation whose commitVesion < range kv's version
if (logMutationTooOld(pRangeVersions, mutation, commitVersion)) {
cc->oldLogMutations += 1;
continue;
}
if (mutation.param1 >= asset.range.end ||
(isRangeMutation(mutation) && mutation.param2 < asset.range.begin) ||
(!isRangeMutation(mutation) && mutation.param1 < asset.range.begin)) {
continue;
}
// Only apply mutation within the asset.range and apply removePrefix and addPrefix
ASSERT(asset.removePrefix.size() == 0);
if (isRangeMutation(mutation)) {
mutation.param1 = mutation.param1 >= asset.range.begin ? mutation.param1 : asset.range.begin;
mutation.param2 = mutation.param2 < asset.range.end ? mutation.param2 : asset.range.end;
// Remove prefix or add prefix if we restore data to a new key space
if (asset.hasPrefix()) { // Avoid creating new Key
mutation.param1 =
mutation.param1.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix, tempArena);
mutation.param2 =
mutation.param2.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix, tempArena);
}
} else {
if (asset.hasPrefix()) { // Avoid creating new Key
mutation.param1 =
mutation.param1.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix, tempArena);
if (mutation.param1 >= asset.range.end ||
(isRangeMutation(mutation) && mutation.param2 < asset.range.begin) ||
(!isRangeMutation(mutation) && mutation.param1 < asset.range.begin)) {
} else {
// Only apply mutation within the asset.range and apply removePrefix and addPrefix
ASSERT(asset.removePrefix.size() == 0);
if (isRangeMutation(mutation)) {
mutation.param1 = mutation.param1 >= asset.range.begin ? mutation.param1 : asset.range.begin;
mutation.param2 = mutation.param2 < asset.range.end ? mutation.param2 : asset.range.end;
// Remove prefix or add prefix if we restore data to a new key space
if (asset.hasPrefix()) { // Avoid creating new Key
mutation.param1 =
mutation.param1.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix, tempArena);
mutation.param2 =
mutation.param2.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix, tempArena);
}
} else {
if (asset.hasPrefix()) { // Avoid creating new Key
mutation.param1 =
mutation.param1.removePrefix(asset.removePrefix).withPrefix(asset.addPrefix, tempArena);
}
}
cc->loadedLogBytes += mutation.totalSize();
TraceEvent(SevFRMutationInfo, "FastRestoreDecodeLogFile")
.detail("CommitVersion", commitVersion)
.detail("ParsedMutation", mutation.toString());
auto it = kvOps->insert(std::make_pair(LogMessageVersion(commitVersion, sub++), MutationsVec()));
ASSERT(it.second); // inserted is true
ASSERT(sub <
std::numeric_limits<int32_t>::max()); // range file mutation uses int32_max as subversion
it.first->second.push_back_deep(it.first->second.arena(), mutation);
// Sampling data similar to how SS sample bytes
ByteSampleInfo sampleInfo = isKeyValueInSample(KeyValueRef(mutation.param1, mutation.param2));
if (sampleInfo.inSample) {
cc->sampledLogBytes += sampleInfo.sampledSize;
samples->push_back_deep(samples->arena(),
SampledMutation(mutation.param1, sampleInfo.sampledSize));
}
ASSERT_WE_THINK(kLen >= 0 && kLen < val.size());
ASSERT_WE_THINK(vLen >= 0 && vLen < val.size());
}
}
cc->loadedLogBytes += mutation.totalSize();
TraceEvent(SevFRMutationInfo, "FastRestoreDecodeLogFile")
.detail("CommitVersion", commitVersion)
.detail("ParsedMutation", mutation.toString());
auto it = kvOps.insert(std::make_pair(LogMessageVersion(commitVersion, sub++), MutationsVec()));
ASSERT(it.second); // inserted is true
ASSERT(sub < std::numeric_limits<int32_t>::max()); // range file mutation uses int32_max as subversion
it.first->second.push_back_deep(it.first->second.arena(), mutation);
// Sampling data similar to how SS sample bytes
ByteSampleInfo sampleInfo = isKeyValueInSample(KeyValueRef(mutation.param1, mutation.param2));
if (sampleInfo.inSample) {
cc->sampledLogBytes += sampleInfo.sampledSize;
samples.push_back_deep(samples.arena(), SampledMutation(mutation.param1, sampleInfo.sampledSize));
}
ASSERT_WE_THINK(kLen >= 0 && kLen < val.size());
ASSERT_WE_THINK(vLen >= 0 && vLen < val.size());
}
mutationMapIterator++;
}
return Void();
}
// Parsing the data blocks in a range file

View File

@@ -183,10 +183,15 @@ public:
void setEncryptKeyProxy(const EncryptKeyProxyInterface& interf) {
auto newInfo = serverInfo->get();
auto newClientInfo = clientInfo->get();
newClientInfo.id = deterministicRandom()->randomUniqueID();
newInfo.id = deterministicRandom()->randomUniqueID();
newInfo.infoGeneration = ++dbInfoCount;
newInfo.encryptKeyProxy = interf;
newInfo.client.encryptKeyProxy = interf;
newClientInfo.encryptKeyProxy = interf;
serverInfo->set(newInfo);
clientInfo->set(newClientInfo);
}
void setConsistencyScan(const ConsistencyScanInterface& interf) {
@@ -199,7 +204,9 @@ public:
void clearInterf(ProcessClass::ClassType t) {
auto newInfo = serverInfo->get();
auto newClientInfo = clientInfo->get();
newInfo.id = deterministicRandom()->randomUniqueID();
newClientInfo.id = deterministicRandom()->randomUniqueID();
newInfo.infoGeneration = ++dbInfoCount;
if (t == ProcessClass::DataDistributorClass) {
newInfo.distributor = Optional<DataDistributorInterface>();
@@ -209,10 +216,13 @@ public:
newInfo.blobManager = Optional<BlobManagerInterface>();
} else if (t == ProcessClass::EncryptKeyProxyClass) {
newInfo.encryptKeyProxy = Optional<EncryptKeyProxyInterface>();
newInfo.client.encryptKeyProxy = Optional<EncryptKeyProxyInterface>();
newClientInfo.encryptKeyProxy = Optional<EncryptKeyProxyInterface>();
} else if (t == ProcessClass::ConsistencyScanClass) {
newInfo.consistencyScan = Optional<ConsistencyScanInterface>();
}
serverInfo->set(newInfo);
clientInfo->set(newClientInfo);
}
ACTOR static Future<Void> countClients(DBInfo* self) {

View File

@@ -270,7 +270,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
StringRef(backupContainer),
{},
deterministicRandom()->randomInt(0, 60),
deterministicRandom()->randomInt(0, 100),
deterministicRandom()->randomInt(0, 2000),
tag.toString(),
backupRanges,
StopWhenDone{ !stopDifferentialDelay },