Fix a race that can cause recovery to be stuck (#12212)

* Fix a race that can causes recovery to be stuck

When purging old generations, these no longer needed generations are removed in
the in-memory LogSystem data structure. Then the change is made durable on
coordinators.

If there is a recovery happened before the change is durable, and ServerDBInfo
broadcast is sent to the old tlogs and they can be displaced/removed. As a
result, the recovery will become stuck waiting for locking these old tlogs.

This PR updates the purging so that it never directly changes the in-memory data
structure and only modifies states on coordinators. So next recovery will pick
up the change.

* Add in-memory purging of old generation after writing coordinators

This allows old tlogs to be removed after CC purges them.

20250628-180019-jzhou-69511e1cbc01ab08
This commit is contained in:
Jingyu Zhou
2025-06-30 16:21:45 -07:00
committed by GitHub
parent e6b9ef7526
commit 774c792e14
4 changed files with 36 additions and 16 deletions

View File

@@ -461,8 +461,11 @@ ACTOR Future<Void> trackTlogRecovery(Reference<ClusterRecoveryData> self,
self->configuration; // self-configuration can be changed by configurationMonitor so we need a copy
loop {
state DBCoreState newState;
self->logSystem->purgeOldRecoveredGenerations();
self->logSystem->toCoreState(newState);
// We can't purge old generations until we have the new state durable on coordinators,
// otherwise old tlogs can be removed before the new state is written,
// which may cause immediate recovery to stuck in locking old tlogs.
self->logSystem->purgeOldRecoveredGenerationsCoreState(newState);
newState.recoveryCount = recoverCount;
// Update Coordinators EncryptionAtRest status during the very first recovery of the cluster (empty database)
@@ -486,6 +489,8 @@ ACTOR Future<Void> trackTlogRecovery(Reference<ClusterRecoveryData> self,
configuration.expectedLogSets(self->primaryDcId.size() ? self->primaryDcId[0] : Optional<Key>()))
.detail("RecoveryCount", newState.recoveryCount);
wait(self->cstate.write(newState, finalUpdate));
// Purge in memory state after durability to avoid race conditions.
self->logSystem->purgeOldRecoveredGenerationsInMemory(newState);
if (self->cstateUpdated.canBeSet()) {
self->cstateUpdated.send(Void());
}

View File

@@ -333,19 +333,20 @@ Reference<ILogSystem> TagPartitionedLogSystem::fromOldLogSystemConfig(UID const&
return logSystem;
}
void TagPartitionedLogSystem::purgeOldRecoveredGenerations() {
void TagPartitionedLogSystem::purgeOldRecoveredGenerationsCoreState(DBCoreState& newState) {
Version oldestGenerationRecoverAtVersion = std::min(recoveredVersion->get(), remoteRecoveredVersion->get());
TraceEvent("ToCoreStateOldestGenerationRecoverAtVersion")
.detail("RecoveredVersion", recoveredVersion->get())
.detail("RemoteRecoveredVersion", remoteRecoveredVersion->get())
.detail("OldestBackupEpoch", oldestBackupEpoch);
for (int i = 0; i < oldLogData.size(); ++i) {
const auto& oldData = oldLogData[i];
for (int i = 0; i < newState.oldTLogData.size(); ++i) {
const auto& oldData = newState.oldTLogData[i];
// Remove earlier generation that TLog data are
// - consumed by all storage servers
// - no longer used by backup workers
if (oldData.recoverAt < oldestGenerationRecoverAtVersion && oldData.epoch < oldestBackupEpoch) {
if (g_network->isSimulated()) {
ASSERT(oldLogData.size() == newState.oldTLogData.size());
for (int j = 0; j < oldLogData.size(); ++j) {
TraceEvent("AllOldGenerations")
.detail("Index", j)
@@ -353,25 +354,36 @@ void TagPartitionedLogSystem::purgeOldRecoveredGenerations() {
.detail("Begin", oldLogData[j].epochBegin)
.detail("RecoverAt", oldLogData[j].recoverAt);
}
for (int j = i + 1; j < oldLogData.size(); ++j) {
ASSERT(oldLogData[j].recoverAt < oldestGenerationRecoverAtVersion);
ASSERT(oldData.tLogs[0]->backupWorkers.size() == 0 || oldLogData[j].epoch < oldestBackupEpoch);
for (int j = i + 1; j < newState.oldTLogData.size(); ++j) {
ASSERT(newState.oldTLogData[j].recoverAt < oldestGenerationRecoverAtVersion);
ASSERT(oldLogData[i].tLogs[0]->backupWorkers.size() == 0 ||
newState.oldTLogData[j].epoch < oldestBackupEpoch);
}
}
for (int j = i; j < oldLogData.size(); ++j) {
TraceEvent("PurgeOldTLogGeneration")
.detail("Begin", oldLogData[j].epochBegin)
.detail("End", oldLogData[j].epochEnd)
.detail("Epoch", oldLogData[j].epoch)
.detail("RecoverAt", oldLogData[j].recoverAt)
for (int j = i; j < newState.oldTLogData.size(); ++j) {
TraceEvent("PurgeOldTLogGenerationCoreState", dbgid)
.detail("Begin", newState.oldTLogData[j].epochBegin)
.detail("End", newState.oldTLogData[j].epochEnd)
.detail("Epoch", newState.oldTLogData[j].epoch)
.detail("RecoverAt", newState.oldTLogData[j].recoverAt)
.detail("Index", j);
}
oldLogData.resize(i);
newState.oldTLogData.resize(i);
break;
}
}
}
void TagPartitionedLogSystem::purgeOldRecoveredGenerationsInMemory(const DBCoreState& newState) {
auto generations = newState.oldTLogData.size();
if (generations < oldLogData.size()) {
TraceEvent("PurgeOldTLogGenerationsInMemory", dbgid)
.detail("OldGenerations", oldLogData.size())
.detail("NewGenerations", generations);
oldLogData.resize(generations);
}
}
void TagPartitionedLogSystem::toCoreState(DBCoreState& newState) const {
if (recoveryComplete.isValid() && recoveryComplete.isError())
throw recoveryComplete.getError();
@@ -2391,6 +2403,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::epochEnd(Reference<AsyncVar<Referenc
.detail("PrimaryLocality", primaryLocality)
.detail("RemoteLocality", remoteLocality)
.detail("FoundRemote", foundRemote)
.detail("ForceRecovery", *forceRecovery)
.detail("Modified", modifiedLogSets)
.detail("Removed", removedLogSets);
for (int i = 0; i < prevState.tLogs.size(); i++) {

View File

@@ -501,7 +501,8 @@ struct ILogSystem {
virtual bool remoteStorageRecovered() const = 0;
virtual void purgeOldRecoveredGenerations() = 0;
virtual void purgeOldRecoveredGenerationsCoreState(DBCoreState&) = 0;
virtual void purgeOldRecoveredGenerationsInMemory(const DBCoreState&) = 0;
virtual Future<Void> onCoreStateChanged() const = 0;
// Returns if and when the output of toCoreState() would change (for example, when older logs can be discarded from

View File

@@ -199,7 +199,8 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
bool remoteStorageRecovered() const final;
// Checks older TLog generations and remove no longer needed generations from the log system.
void purgeOldRecoveredGenerations() final;
void purgeOldRecoveredGenerationsCoreState(DBCoreState&) final;
void purgeOldRecoveredGenerationsInMemory(const DBCoreState&) final;
Future<Void> onCoreStateChanged() const final;